{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "21f4674b-aa7b-4ec0-b442-f318c26dff80",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# RDD的持久化"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "99437c80-94f7-42be-afdb-25a6a77766b7",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## RDD的数据是过程数据\n",
    "\n",
    "RDD之间进行相互迭代计算（Transformation转换），当执行开始后，新RDD生成，老RDD消失。\n",
    "\n",
    "RDD的数据只是过程数据，只在处理的过程中存在，一旦处理完成，就不见了。\n",
    "\n",
    "在下一次要用到RDD的数据的时候，再根据血缘关系，从头重新处理一遍RDD的数据。\n",
    "\n",
    "> 这个特性可以最大化的利用资源，老的RDD从内存中清理，给后续的计算腾出内存空间。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f70bc02a-b74a-497f-b07a-96c25d2b0a62",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd3.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "a01fbaec-e544-4d63-909d-9ff2c6e0522c",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 缓存\n",
    "\n",
    "对于上述的场景 rdd1 -> rdd2 -> rdd3 执行了多次，需要优化。\n",
    "\n",
    "如果rdd3不消失，那么 rdd1 -> rdd2 -> rdd3 就不会执行多次了。\n",
    "\n",
    "**RDD的缓存技术：** Spark提供了缓存API，可以让我们通过调用API，将指定的RDD的数据保留下来。\n",
    "\n",
    "```\n",
    "rdd3.cache()                                    # 缓存到内存中\n",
    "rdd3.persist(StorageLevel.MEMORY_ONLY)          # 仅内存缓存\n",
    "rdd3.persist(StorageLevel.MEMORY_ONLY_2)        # 仅内存缓存，2个副本\n",
    "rdd3.persist(StorageLevel.DISK_ONLY)            # 仅硬盘缓存\n",
    "rdd3.persist(StorageLevel.DISK_ONLY_2)          # 仅硬盘缓存，2个副本\n",
    "rdd3.persist(StorageLevel.DISK_ONLY_3)          # 仅硬盘缓存，3个副本\n",
    "rdd3.persist(StorageLevel.MEMORY_AND_DISK)      # 先放内存，内存不够放硬盘\n",
    "rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)    # 先放内存，内存不够放硬盘，2个副本\n",
    "rdd3.persist(StorageLevel.OFF_HEAP)             # 堆外内存（系统内存）\n",
    "\n",
    "# 一般建议使用 rdd3.persist(StorageLevel.MEMORY_AND_DISK)\n",
    "\n",
    "# 主动清理缓存\n",
    "rdd3.unpersist()\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "58c1aded-acc9-4e6b-99bd-89ba279d18a7",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "# 仅缓存到内存\n",
    "rdd3.cache()\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd3.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "4d942009-4ed2-45ef-ae41-24cf05fef516",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 缓存的特点\n",
    "\n",
    "* 缓存技术可以将RDD过程数据持久化保存到内存或者硬盘上\n",
    "* 这个保存在设定上是不安全的\n",
    "\n",
    "> 缓存的数据在设计上认为有丢失的风险。\n",
    "\n",
    "所以，缓存有一个特点，就是**会保留RDD之间的血缘关系**。\n",
    "\n",
    "一旦缓存丢失，可以基于血缘关系记录，重新计算这个RDD的数据。\n",
    "\n",
    "> **缓存如何丢失：**  \n",
    "> 在内存中的缓存是不安全的，比如断电\\内存不足，会把缓存清理掉，释放资源给计算  \n",
    "> 硬盘中的数据也有可能因硬盘损坏而丢失\n",
    "\n",
    "RDD的数据是按照分区，分别缓存到Executor的内存或硬盘，是分散缓存的。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "fedffa60-3483-47ca-a4d0-489cecdb370b",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## CheckPoint\n",
    "\n",
    "CheckPoint技术，也是将RDD的数据保存下来，但是它**仅支持硬盘存储**。\n",
    "\n",
    "并且，CheckPoint：\n",
    "* 它被设计认为是安全的\n",
    "* 它不保留血缘关系\n",
    "\n",
    "CheckPoint是将各个分区的数据集中保存到硬盘，而不是分散存储。\n",
    "\n",
    "一般将CheckPoint的数据保存到HDFS上，这样数据就比较安全了。\n",
    "\n",
    "CheckPoint不会立即执行，需要先执行Action，**并且CheckPoint在代码中需要放到Action之前**。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "bf009126-c737-4420-8eb4-c21db95cbc86",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 缓存和CheckPoint的对比\n",
    "\n",
    "* CheckPoint不管分区数量多少，风险是一样的；缓存的分区越多风险越高。\n",
    "* CheckPoint支持写入HDFS；缓存不支持写入HDFS。\n",
    "* CheckPoint不支持内存；缓存支持写内存，如果缓存写内存，性能比CheckPoint要好一些。\n",
    "* CheckPoint因为设计认为是安全的，所以不保留血缘关系；缓存因为设计认为是不安全的，所以保留血缘关系。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "fd7f8923-e64f-40b6-b127-1e8277fd7f63",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "sc.setCheckpointDir(\"/mnt/databrickscontainer1/checkpoint\")\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "rdd3.checkpoint()\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd3.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n",
    "\n",
    "time.sleep(5)\n",
    "result4 = rdd3.collect()\n",
    "print(result4)\n",
    "print(result4)\n",
    "print(result4)\n",
    "\n",
    "time.sleep(5)\n",
    "result5 = rdd3.collect()\n",
    "print(result5)\n",
    "print(result5)\n",
    "print(result5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "3f67579b-a426-47f3-a1ea-e73f5a233491",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 注意\n",
    "\n",
    "CheckPoint是一种重量级的使用，也就是RDD的重新计算成本很高的时候，或者数据量很大，我们采用CheckPoint比较合适。\n",
    "\n",
    "如果数据量小，或者RDD重新计算是非常快的，用CheckPoint就没啥必要，直接缓存就可以了。\n",
    "\n",
    "```\n",
    "    def checkpoint(self):\n",
    "        \"\"\"\n",
    "        Mark this RDD for checkpointing. It will be saved to a file inside the\n",
    "        checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and\n",
    "        all references to its parent RDDs will be removed. This function must\n",
    "        be called before any job has been executed on this RDD. It is strongly\n",
    "        recommended that this RDD is persisted in memory, otherwise saving it\n",
    "        on a file will require recomputation.\n",
    "        \"\"\"\n",
    "        self.is_checkpointed = True\n",
    "        self._jrdd.rdd().checkpoint()\n",
    "```\n",
    "\n",
    "**CheckPoint需要放在对应RDD的Action之前，对RDD才有持久化的效果，放在Action之后，即便后续还有RDD上的Action操作，CheckPoint也不起作用；缓存会对缓存语句后面的Action起作用。**\n",
    "\n",
    "其他RDD的Action对当前RDD的CheckPoint没有影响。\n",
    "\n",
    "> Cache和CheckPoint两个API都不是Action类型的  \n",
    "> 所以想要他们工作，必须在后面接上Action  \n",
    "> 接Action算子的目的是让RDD有数据，而不是为了让Cache和CheckPoint工作"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "7177dfc0-7276-4cee-b693-3c1787125790",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "sc.setCheckpointDir(\"/mnt/databrickscontainer1/checkpoint\")\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "# 放在Action之前，看看后续RDD的Action的结果，体会一下Cache与CheckPoint的持久化效果\n",
    "# rdd3.cache()\n",
    "# rdd3.checkpoint()\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd3.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "# 放在Action之后，看看后续RDD的Action的结果，体会一下Cache与CheckPoint的持久化效果\n",
    "rdd3.cache()\n",
    "# rdd3.checkpoint()\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n",
    "\n",
    "time.sleep(5)\n",
    "result4 = rdd3.collect()\n",
    "print(result4)\n",
    "print(result4)\n",
    "print(result4)\n",
    "\n",
    "time.sleep(5)\n",
    "result5 = rdd3.collect()\n",
    "print(result5)\n",
    "print(result5)\n",
    "print(result5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "83740376-735a-4fa2-aaff-d0e3246fa814",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "sc.setCheckpointDir(\"/mnt/databrickscontainer1/checkpoint\")\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd2.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "# 如果CheckPoint放在其他RDD的Action之后，有没有影响？\n",
    "rdd3.checkpoint()\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n",
    "\n",
    "time.sleep(5)\n",
    "result4 = rdd3.collect()\n",
    "print(result4)\n",
    "print(result4)\n",
    "print(result4)\n",
    "\n",
    "time.sleep(5)\n",
    "result5 = rdd3.collect()\n",
    "print(result5)\n",
    "print(result5)\n",
    "print(result5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "98ab8d84-0fa1-4ad8-ae55-ca8491e435d7",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "## 清理缓存\n",
    "\n",
    "如果不需要保留缓存了，需要手动清理以释放资源。\n",
    "\n",
    "* unpersist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "d28b3395-8853-44ce-a4a3-99bbac70fa9e",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import datetime\n",
    "import time\n",
    "\n",
    "rdd1 = sc.parallelize([1,2,3])\n",
    "\n",
    "rdd2 = rdd1.map(lambda x: (x, np.random.randint(500)))\n",
    "\n",
    "rdd3 = rdd2.map(lambda x: (x[0], x[1], datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "\n",
    "rdd3.cache()\n",
    "\n",
    "time.sleep(5)\n",
    "result1 = rdd3.collect()\n",
    "print(result1)\n",
    "print(result1)\n",
    "print(result1)\n",
    "\n",
    "time.sleep(5)\n",
    "result2 = rdd3.collect()\n",
    "print(result2)\n",
    "print(result2)\n",
    "print(result2)\n",
    "\n",
    "time.sleep(5)\n",
    "result3 = rdd3.collect()\n",
    "print(result3)\n",
    "print(result3)\n",
    "print(result3)\n",
    "\n",
    "rdd3.unpersist()\n",
    "\n",
    "time.sleep(5)\n",
    "result4 = rdd3.collect()\n",
    "print(result4)\n",
    "print(result4)\n",
    "print(result4)\n",
    "\n",
    "time.sleep(5)\n",
    "result5 = rdd3.collect()\n",
    "print(result5)\n",
    "print(result5)\n",
    "print(result5)"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {},
   "notebookName": "RDD的持久化",
   "notebookOrigID": 2180139033493924,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "name": ""
  },
  "language_info": {
   "name": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
